package com.atguigu.flink.watermark;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/15
 *

    必须是KeyedStream才能进行IntervalJoin
 */
public class Demo9_IntervalJoin
{
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);



        env.setParallelism(1);

         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                     .<WaterSensor>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTs());
         
         //准备两个流
        KeyedStream<WaterSensor, String> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .keyBy(WaterSensor::getId);

        KeyedStream<WaterSensor, String> ds2 = env
            .socketTextStream("hadoop102", 8889)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .keyBy(WaterSensor::getId);


        //intervaljoin是根据ds1中数据到达的时间，设置一个上下界(时间范围)，只有ds2中的数据能再范围内到达，就能关联(按照key)
        ds1
            .intervalJoin(ds2)
            //.inEventTime()
            //上下界，使用当前数据的时间 + (边界时间)
            .between(Time.seconds(-2l),Time.seconds(2l))
            //对面的数据刚好卡在界限上，是否允许关联
            //.upperBoundExclusive()
            //.lowerBoundExclusive()
            /*
                    windowJoin: 迟到指数据在窗口关闭后到达。迟到的数据，无法被计算。

                    intervalJoin: 迟到指数据的时间属性 < 当前process算子的水印。迟到的数据无法被join
             */
            .process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>()
            {
                @Override
                public void processElement(WaterSensor left, WaterSensor right, ProcessJoinFunction<WaterSensor, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                    out.collect(left +"====" + right);
                }
            })
            .print();



        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
